-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Window function fusion #29276
Window function fusion #29276
Conversation
I kept forgetting these matches when adding new aggregate functions. Now the compiler will tell me if I forget one of these.
96b89c9
to
9611c21
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adapter changes LGTM
.clone(); | ||
|
||
let all_func_return_types = | ||
return_type_with_orig_row.unwrap_record_element_type()[0].clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return_type_with_orig_row.unwrap_record_element_type()
which is also used below could be calculated once and stored in a variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below it is slightly different: it's all_func_return_types.unwrap_record_element_type()
/// The expected input is in the format of `[((OriginalRow, (Args1, Args2, ...)), OrderByExprs...)]` | ||
/// where `Args1`, `Args2`, are the arguments of each of the fused functions. For functions that | ||
/// have only a single argument (first_value/last_value), these are simple values. For functions | ||
/// that have multiple arguments (lag/lead), these are also records. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok that these values will have a type that is not expressible by materialize today (effecticely a union)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's true that this function can handle multiple different types at each argument, and this couldn't be represented by our type system. However, the union fortunately doesn't need to be represented in the type system, because we know the actual type when looking at a specific query.
One way to think about this situation is that AggregateFunc::FusedValueWindowFunc
is kind of a template function, in that it's parameterized by the functions that got fused. fused_value_window_func
is an implementation of all possible instantiations of this template function.
// Let's create a new RowArena, to avoid flooding the caller's arena with stuff proportional to | ||
// the window partition size. We will use our own arena for internal evaluations, and will use | ||
// the caller's at the very end to return the final result Datum. | ||
let temp_storage = RowArena::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What goes wrong if we use the caller's arena? Won't we deallocate it very soon anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, might be ok, but I'm not sure. There are many callers, and I didn't want to check how soon each of them deallocates it.
(window_aggr
does the same thing.)
|
||
let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by); | ||
|
||
let mut encoded_argsss = vec![Vec::new(); funcs.len()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a typo? or does the sss
signify something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a Haskell naming convention for keeping track of collection nesting depth: when naming a nested collection, we add an s at each level. Like,
- if I have a variable that holds a single argument, it's named
arg
, - if I have a variable that is a collection of arguments, then it's
args
, - if I have a variable that is a collection of collections of arguments, then it's
argss
, - if I have a variable that is a collection of collections of collections of arguments, then it's
argsss
.
The last one is the case here, because encoded_argsss
is a Vec<Vec<Datum>>
, where each Datum
can be a record of multiple arguments. (The outer Vec is over input rows, the inner Vec is over the different constituents of the fused function, and the Datum is the arguments to a single constituent function, e.g., lag's 3 arguments.)
Haskell example: https://stackoverflow.com/questions/66066788/i-am-struggling-to-understand-how-concat-is-read-in-haskell
order_by, | ||
window_frame, | ||
}, | ||
ValueWindowFunc::LastValue => mz_expr::AggregateFunc::LastValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR but why is LagLead
folded into a single aggregate but FirstValue
and LastValue
are different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This predates my time at Materialize, but indeed, it looks like it might be better to fold this into a FirstLastValue
.
/// references that are at a deeper subquery nesting depth, but refer back to the root level. | ||
/// (Note that even if `self` is embedded inside a larger expression, we consider the | ||
/// "root level" to be `self`'s level.) | ||
pub fn visit_columns_referring_to_root_level<F>(&self, f: &mut F) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give an example of the columns that this visits? What does the usize
passed to the visitor function represent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This visits all column references in the expression, at every level.
The usize given to the visitor is the subquery nesting depth: When recursively descending down the expression, if we are descending into a subquery, we increase the nesting depth.
(Also discussed during the call.)
@@ -455,6 +458,7 @@ impl HirRelationExpr { | |||
requires_nonexistent_column | |||
}) | |||
.unwrap_or(scalars.len()); | |||
assert!(end_idx > 0, "a Map expression references itself or a later column; lowered_arity: {}, expressions: {:?}", lowered_arity, scalars); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to this PR or a thing that would be useful independently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useful independently. Without the assert, if there is a bug in the preparation of the HIR expressions such that this condition ends up false, then we get into an infinite loop here, which would be really bad in production. It's much less bad to crash instead. (Without this assert, an envd thread would be spinning forever, and then it would be really hard to debug what went wrong if this happens in production. But even when it happens locally it makes it easier to figure out what's going on.)
During the development of this PR, I twice had bugs where I got into an infinite loop here. I can imagine similar bugs happening in other situations later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went over this a second time with Gabor and everything LGTM!
9611c21
to
b1d8778
Compare
Thank you very much for the reviews! Merging! |
This adds the fusion optimization for value window functions (
lag
,lead
,first_value
,last_value
), which is badly needed by https://github.com/MaterializeInc/accounts/issues/3The point is that it amortizes the various overheads of window functions (e.g., the wrapping/unwrapping of other columns, which gets quadratically big with the number of window functions) by performing the MIR window function pattern only once for a group of window functions.
I've added this as an HIR transform, because an MIR transform would have needed to recognize and manipulate the complicated MIR window function pattern.
See the doc comment on
fuse_window_functions
for how we choose the groups of window functions to be fused.I've added a new value window function called
Fused
, and implemented the corresponding MIRAggregateFunc
. In several places in the implementation ofAggregateFunc::FusedValueWindowFunc
, I refactored the implementations of existing value window functions in order to be able to reuse as much code from them as possible.Note that this does not yet implement fusion for window aggregations, which would be needed by https://github.com/MaterializeInc/accounts/issues/77. I'll implement that in a follow-up PR soon.
Motivation
Tips for reviewer
Look at the commits separately. The first two are just minor preparations (see commit msgs), the 3rd commit is the main one, and then there is a commit that guards the new code behind a feature flag.
Checklist
window_funcs.slt
, there were 20 cases where fusion happened.$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.